package com.example.springbootdockertest.controller.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.time.Duration;
import java.util.HashSet;
import java.util.Properties;

/**
 * kafka消费者测试
 *
 * @Author liguangcheng
 * @Date 2021/10/5 11:12 上午
 * @Vision 1.0
 **/
@RestController
public class KafkaCustomerTest {
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaConsumerBootstrapServers;
    @Value("${spring.kafka.consumer.properties.group.id}")
    private String group;
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("get/{topic}")
    public void get(@PathVariable("topic") String topic) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaConsumerBootstrapServers);
        props.setProperty("group.id", group);
        props.setProperty("enable.auto.commit", enableAutoCommit);
//    props.setProperty("max.poll.records","1000");
//    props.setProperty("request.timeout.ms","5000");
//    props.setProperty("max.poll.interval.ms", "300000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        HashSet<String> topics = new HashSet<String>() {{
            add(topic);
        }};
        kafkaConsumer.subscribe(topics);
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("record-->" + record);
            String topic1 = record.topic();
            System.out.println("topic-->" + topic1);
            String key = record.key();
            System.out.println("key-->" + key);
            System.out.println("value-->" + record.value());
            kafkaTemplate.send(topic1, key, "消费成功");

        }
    }


}
